From 1b4790b9aae14c080dd6fd58eec846d997488f54 Mon Sep 17 00:00:00 2001 From: "vitess-bot[bot]" <108069721+vitess-bot[bot]@users.noreply.github.com> Date: Tue, 24 Oct 2023 11:17:41 +0300 Subject: [PATCH 1/3] Cherry-pick 94572b3f0c42d18c01d365f30803e3472ea75e0d with conflicts --- .../onlineddl_vrepl_mini_stress_test.go | 51 ++++++++++++++++++- go/test/endtoend/onlineddl/vtgate_util.go | 15 ++++-- 2 files changed, 61 insertions(+), 5 deletions(-) diff --git a/go/test/endtoend/onlineddl/vrepl_stress/onlineddl_vrepl_mini_stress_test.go b/go/test/endtoend/onlineddl/vrepl_stress/onlineddl_vrepl_mini_stress_test.go index 107050c2708..568c912ccf8 100644 --- a/go/test/endtoend/onlineddl/vrepl_stress/onlineddl_vrepl_mini_stress_test.go +++ b/go/test/endtoend/onlineddl/vrepl_stress/onlineddl_vrepl_mini_stress_test.go @@ -23,6 +23,7 @@ import ( "math/rand" "os" "path" + "runtime" "strings" "sync" "sync/atomic" @@ -136,12 +137,22 @@ var ( writeMetrics WriteMetrics ) +var ( + countIterations = 5 +) + const ( +<<<<<<< HEAD maxTableRows = 4096 maxConcurrency = 20 singleConnectionSleepInterval = 2 * time.Millisecond countIterations = 5 migrationWaitTimeout = 60 * time.Second +======= + maxTableRows = 4096 + workloadDuration = 5 * time.Second + migrationWaitTimeout = 60 * time.Second +>>>>>>> 94572b3f0c (OnlineDDL: reduce vrepl_stress workload in forks (#14302)) ) func resetOpOrder() { @@ -371,6 +382,9 @@ func checkTablesCount(t *testing.T, tablet *cluster.Vttablet, showTableName stri query := fmt.Sprintf(`show tables like '%%%s%%';`, showTableName) ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() + ticker := time.NewTicker(time.Second) + defer ticker.Stop() + rowcount := 0 for { @@ -382,7 +396,7 @@ func checkTablesCount(t *testing.T, tablet *cluster.Vttablet, showTableName stri } select { - case <-time.After(time.Second): + case <-ticker.C: continue // Keep looping case <-ctx.Done(): // Break below to the assertion @@ -485,7 +499,11 @@ func generateDelete(t *testing.T, conn *mysql.Conn) error { return err } +<<<<<<< HEAD func runSingleConnection(ctx context.Context, t *testing.T, done *int64) { +======= +func runSingleConnection(ctx context.Context, t *testing.T, sleepInterval time.Duration) { +>>>>>>> 94572b3f0c (OnlineDDL: reduce vrepl_stress workload in forks (#14302)) log.Infof("Running single connection") conn, err := mysql.Connect(ctx, &vtParams) require.Nil(t, err) @@ -496,6 +514,9 @@ func runSingleConnection(ctx context.Context, t *testing.T, done *int64) { _, err = conn.ExecuteFetch("set transaction isolation level read committed", 1000, true) require.Nil(t, err) + ticker := time.NewTicker(sleepInterval) + defer ticker.Stop() + for { if atomic.LoadInt64(done) == 1 { log.Infof("Terminating single connection") @@ -509,20 +530,48 @@ func runSingleConnection(ctx context.Context, t *testing.T, done *int64) { case 2: err = generateDelete(t, conn) } +<<<<<<< HEAD +======= + select { + case <-ctx.Done(): + log.Infof("Terminating single connection") + return + case <-ticker.C: + } +>>>>>>> 94572b3f0c (OnlineDDL: reduce vrepl_stress workload in forks (#14302)) assert.Nil(t, err) time.Sleep(singleConnectionSleepInterval) } } func runMultipleConnections(ctx context.Context, t *testing.T) { +<<<<<<< HEAD log.Infof("Running multiple connections") var done int64 +======= + // The workload for a 16 vCPU machine is: + // - Concurrency of 16 + // - 2ms interval between queries for each connection + // As the number of vCPUs decreases, so do we decrease concurrency, and increase intervals. For example, on a 8 vCPU machine + // we run concurrency of 8 and interval of 4ms. On a 4 vCPU machine we run concurrency of 4 and interval of 8ms. + maxConcurrency := runtime.NumCPU() + sleepModifier := 16.0 / float64(maxConcurrency) + baseSleepInterval := 2 * time.Millisecond + singleConnectionSleepIntervalNanoseconds := float64(baseSleepInterval.Nanoseconds()) * sleepModifier + sleepInterval := time.Duration(int64(singleConnectionSleepIntervalNanoseconds)) + + log.Infof("Running multiple connections: maxConcurrency=%v, sleep interval=%v", maxConcurrency, sleepInterval) +>>>>>>> 94572b3f0c (OnlineDDL: reduce vrepl_stress workload in forks (#14302)) var wg sync.WaitGroup for i := 0; i < maxConcurrency; i++ { wg.Add(1) go func() { defer wg.Done() +<<<<<<< HEAD runSingleConnection(ctx, t, &done) +======= + runSingleConnection(ctx, t, sleepInterval) +>>>>>>> 94572b3f0c (OnlineDDL: reduce vrepl_stress workload in forks (#14302)) }() } <-ctx.Done() diff --git a/go/test/endtoend/onlineddl/vtgate_util.go b/go/test/endtoend/onlineddl/vtgate_util.go index 3d99a2cef92..693523cec48 100644 --- a/go/test/endtoend/onlineddl/vtgate_util.go +++ b/go/test/endtoend/onlineddl/vtgate_util.go @@ -247,9 +247,13 @@ func WaitForMigrationStatus(t *testing.T, vtParams *mysql.ConnParams, shards []c for _, status := range expectStatuses { statusesMap[string(status)] = true } - startTime := time.Now() + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + ticker := time.NewTicker(time.Second) + defer ticker.Stop() + lastKnownStatus := "" - for time.Since(startTime) < timeout { + for { countMatchedShards := 0 r := VtgateExecQuery(t, vtParams, query, "") for _, row := range r.Named().Rows { @@ -266,9 +270,12 @@ func WaitForMigrationStatus(t *testing.T, vtParams *mysql.ConnParams, shards []c if countMatchedShards == len(shards) { return schema.OnlineDDLStatus(lastKnownStatus) } - time.Sleep(1 * time.Second) + select { + case <-ctx.Done(): + return schema.OnlineDDLStatus(lastKnownStatus) + case <-ticker.C: + } } - return schema.OnlineDDLStatus(lastKnownStatus) } // CheckMigrationArtifacts verifies given migration exists, and checks if it has artifacts From 681ce00bf4a50f56feb635c4561fffdf56489555 Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Tue, 24 Oct 2023 14:17:28 +0300 Subject: [PATCH 2/3] resolved conflict Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> --- .../onlineddl_vrepl_mini_stress_test.go | 51 +++++-------------- 1 file changed, 12 insertions(+), 39 deletions(-) diff --git a/go/test/endtoend/onlineddl/vrepl_stress/onlineddl_vrepl_mini_stress_test.go b/go/test/endtoend/onlineddl/vrepl_stress/onlineddl_vrepl_mini_stress_test.go index 568c912ccf8..7f560a24f9e 100644 --- a/go/test/endtoend/onlineddl/vrepl_stress/onlineddl_vrepl_mini_stress_test.go +++ b/go/test/endtoend/onlineddl/vrepl_stress/onlineddl_vrepl_mini_stress_test.go @@ -26,7 +26,6 @@ import ( "runtime" "strings" "sync" - "sync/atomic" "testing" "time" @@ -142,17 +141,9 @@ var ( ) const ( -<<<<<<< HEAD - maxTableRows = 4096 - maxConcurrency = 20 - singleConnectionSleepInterval = 2 * time.Millisecond - countIterations = 5 - migrationWaitTimeout = 60 * time.Second -======= maxTableRows = 4096 workloadDuration = 5 * time.Second migrationWaitTimeout = 60 * time.Second ->>>>>>> 94572b3f0c (OnlineDDL: reduce vrepl_stress workload in forks (#14302)) ) func resetOpOrder() { @@ -238,6 +229,8 @@ func TestMain(m *testing.M) { func TestSchemaChange(t *testing.T) { defer cluster.PanicHandler(t) + ctx := context.Background() + shards = clusterInstance.Keyspaces[0].Shards require.Equal(t, 1, len(shards)) @@ -262,16 +255,17 @@ func TestSchemaChange(t *testing.T) { // that our testing/metrics logic is sound in the first place. testName := fmt.Sprintf("workload without ALTER TABLE %d/%d", (i + 1), countIterations) t.Run(testName, func(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) initTable(t) + + ctx, cancel := context.WithTimeout(ctx, workloadDuration) + defer cancel() + var wg sync.WaitGroup wg.Add(1) go func() { defer wg.Done() runMultipleConnections(ctx, t) }() - time.Sleep(5 * time.Second) - cancel() // will cause runMultipleConnections() to terminate wg.Wait() testSelectTableMetrics(t) }) @@ -296,7 +290,7 @@ func TestSchemaChange(t *testing.T) { // the vreplication/ALTER TABLE did not corrupt our data and we are happy. testName := fmt.Sprintf("ALTER TABLE with workload %d/%d", (i + 1), countIterations) t.Run(testName, func(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) + ctx := context.Background() t.Run("create schema", func(t *testing.T) { testWithInitialSchema(t) }) @@ -304,6 +298,9 @@ func TestSchemaChange(t *testing.T) { initTable(t) }) t.Run("migrate", func(t *testing.T) { + ctx, cancel := context.WithCancel(ctx) + defer cancel() + var wg sync.WaitGroup wg.Add(1) go func() { @@ -313,7 +310,7 @@ func TestSchemaChange(t *testing.T) { hint := fmt.Sprintf("hint-alter-with-workload-%d", i) uuid := testOnlineDDLStatement(t, fmt.Sprintf(alterHintStatement, hint), onlineDDLStrategy, "vtgate", hint) onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusComplete) - cancel() // will cause runMultipleConnections() to terminate + cancel() // Now that the migration is complete, we can stop the workload. wg.Wait() }) t.Run("validate metrics", func(t *testing.T) { @@ -499,11 +496,7 @@ func generateDelete(t *testing.T, conn *mysql.Conn) error { return err } -<<<<<<< HEAD -func runSingleConnection(ctx context.Context, t *testing.T, done *int64) { -======= func runSingleConnection(ctx context.Context, t *testing.T, sleepInterval time.Duration) { ->>>>>>> 94572b3f0c (OnlineDDL: reduce vrepl_stress workload in forks (#14302)) log.Infof("Running single connection") conn, err := mysql.Connect(ctx, &vtParams) require.Nil(t, err) @@ -518,10 +511,6 @@ func runSingleConnection(ctx context.Context, t *testing.T, sleepInterval time.D defer ticker.Stop() for { - if atomic.LoadInt64(done) == 1 { - log.Infof("Terminating single connection") - return - } switch rand.Int31n(3) { case 0: err = generateInsert(t, conn) @@ -530,25 +519,17 @@ func runSingleConnection(ctx context.Context, t *testing.T, sleepInterval time.D case 2: err = generateDelete(t, conn) } -<<<<<<< HEAD -======= select { case <-ctx.Done(): log.Infof("Terminating single connection") return case <-ticker.C: } ->>>>>>> 94572b3f0c (OnlineDDL: reduce vrepl_stress workload in forks (#14302)) assert.Nil(t, err) - time.Sleep(singleConnectionSleepInterval) } } func runMultipleConnections(ctx context.Context, t *testing.T) { -<<<<<<< HEAD - log.Infof("Running multiple connections") - var done int64 -======= // The workload for a 16 vCPU machine is: // - Concurrency of 16 // - 2ms interval between queries for each connection @@ -561,24 +542,16 @@ func runMultipleConnections(ctx context.Context, t *testing.T) { sleepInterval := time.Duration(int64(singleConnectionSleepIntervalNanoseconds)) log.Infof("Running multiple connections: maxConcurrency=%v, sleep interval=%v", maxConcurrency, sleepInterval) ->>>>>>> 94572b3f0c (OnlineDDL: reduce vrepl_stress workload in forks (#14302)) var wg sync.WaitGroup for i := 0; i < maxConcurrency; i++ { wg.Add(1) go func() { defer wg.Done() -<<<<<<< HEAD - runSingleConnection(ctx, t, &done) -======= runSingleConnection(ctx, t, sleepInterval) ->>>>>>> 94572b3f0c (OnlineDDL: reduce vrepl_stress workload in forks (#14302)) }() } - <-ctx.Done() - atomic.StoreInt64(&done, 1) - log.Infof("Running multiple connections: done") wg.Wait() - log.Infof("All connections cancelled") + log.Infof("Running multiple connections: done") } func initTable(t *testing.T) { From 66231968894e01ad3d7b2828ae175f198f5069d1 Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Tue, 24 Oct 2023 14:34:26 +0300 Subject: [PATCH 3/3] empty commit to kick CI Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com>