Skip to content

Commit

Permalink
Tablet throttler: be explicit about client app name, exempt some apps…
Browse files Browse the repository at this point in the history
… from checks and heartbeat renewals (#13195)

* Tablet throttler: non-PRIMARY tablets report back to PRIMARY throttler when they've been 'check'ed

Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com>

* inclusive language

Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com>

* vstreamer: support 'useThrottler' so that clients can choose whther they at all want to involve the throttler.
Some lightweight clients, such as the schema tracker or the binlog watcher, or messager, do not need the throttler, and since some of these clients are _always on_, we also
do not _want_ them to continuously approach the throttler. One side effect of always engaging with the throttler is the infinite renewal of on-demand heartbeats

Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com>

* add app name in test

Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com>

* Enforcing use of explicit names in all throttler checks, specifically in vstreamer.Engine. The throttler exempts specific apps from checks and will not renew heartbeats leases for those apps

Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com>

* new formal throttler app names placeholder

Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com>

* formalize throttlerapp.Name

Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com>

* Refactor names in endtoend tests

Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com>

---------

Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com>
  • Loading branch information
shlomi-noach authored Jun 1, 2023
1 parent e6a321b commit 374b94c
Show file tree
Hide file tree
Showing 37 changed files with 291 additions and 177 deletions.
64 changes: 31 additions & 33 deletions go/test/endtoend/onlineddl/vrepl/onlineddl_vrepl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,20 +33,18 @@ import (
"vitess.io/vitess/go/test/endtoend/onlineddl"
"vitess.io/vitess/go/test/endtoend/throttler"
"vitess.io/vitess/go/vt/schema"
"vitess.io/vitess/go/vt/vttablet/tabletmanager/vreplication"
throttlebase "vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/base"
"vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/throttlerapp"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

var (
clusterInstance *cluster.LocalProcessCluster
shards []cluster.Shard
vtParams mysql.ConnParams
httpClient = throttlebase.SetupHTTPClient(time.Second)
onlineDDLThrottlerAppName = "online-ddl"
vstreamerThrottlerAppName = "vstreamer"
clusterInstance *cluster.LocalProcessCluster
shards []cluster.Shard
vtParams mysql.ConnParams
httpClient = throttlebase.SetupHTTPClient(time.Second)

normalMigrationWait = 45 * time.Second
extendedMigrationWait = 60 * time.Second
Expand Down Expand Up @@ -234,13 +232,13 @@ func throttleResponse(tablet *cluster.Vttablet, path string) (respBody string, e
}

// direct per-tablet throttler API instruction
func throttleApp(tablet *cluster.Vttablet, app string) (string, error) {
return throttleResponse(tablet, fmt.Sprintf("throttler/throttle-app?app=%s&duration=1h", app))
func throttleApp(tablet *cluster.Vttablet, throttlerApp throttlerapp.Name) (string, error) {
return throttleResponse(tablet, fmt.Sprintf("throttler/throttle-app?app=%s&duration=1h", throttlerApp))
}

// direct per-tablet throttler API instruction
func unthrottleApp(tablet *cluster.Vttablet, app string) (string, error) {
return throttleResponse(tablet, fmt.Sprintf("throttler/unthrottle-app?app=%s", app))
func unthrottleApp(tablet *cluster.Vttablet, throttlerApp throttlerapp.Name) (string, error) {
return throttleResponse(tablet, fmt.Sprintf("throttler/unthrottle-app?app=%s", throttlerApp))
}

func TestSchemaChange(t *testing.T) {
Expand Down Expand Up @@ -385,7 +383,7 @@ func TestSchemaChange(t *testing.T) {
// begin throttling:
onlineddl.ThrottleAllMigrations(t, &vtParams)
defer onlineddl.UnthrottleAllMigrations(t, &vtParams)
onlineddl.CheckThrottledApps(t, &vtParams, onlineDDLThrottlerAppName, true)
onlineddl.CheckThrottledApps(t, &vtParams, throttlerapp.OnlineDDLName, true)

uuid := testOnlineDDLStatement(t, alterTableTrivialStatement, "vitess", providedUUID, providedMigrationContext, "vtgate", "test_val", "", true)
_ = onlineddl.WaitForMigrationStatus(t, &vtParams, shards, uuid, normalMigrationWait, schema.OnlineDDLStatusRunning)
Expand All @@ -400,11 +398,11 @@ func TestSchemaChange(t *testing.T) {
// to be strictly higher than started_timestamp
assert.GreaterOrEqual(t, lastThrottledTimestamp, startedTimestamp)
component := row.AsString("component_throttled", "")
assert.Contains(t, []string{string(vreplication.VCopierComponentName), string(vreplication.VPlayerComponentName)}, component)
assert.Contains(t, []string{throttlerapp.VCopierName.String(), throttlerapp.VPlayerName.String()}, component)

// unthrottle
onlineddl.UnthrottleAllMigrations(t, &vtParams)
onlineddl.CheckThrottledApps(t, &vtParams, onlineDDLThrottlerAppName, false)
onlineddl.CheckThrottledApps(t, &vtParams, throttlerapp.OnlineDDLName, false)

status := onlineddl.WaitForMigrationStatus(t, &vtParams, shards, uuid, normalMigrationWait, schema.OnlineDDLStatusComplete, schema.OnlineDDLStatusFailed)
fmt.Printf("# Migration status (for debug purposes): <%s>\n", status)
Expand All @@ -421,11 +419,11 @@ func TestSchemaChange(t *testing.T) {
// vstreamer source; but it's OK to be on the safe side and throttle on all tablets. Doesn't
// change the essence of this test.
for _, tablet := range shard.Vttablets {
body, err := throttleApp(tablet, vstreamerThrottlerAppName)
defer unthrottleApp(tablet, vstreamerThrottlerAppName)
body, err := throttleApp(tablet, throttlerapp.VStreamerName)
defer unthrottleApp(tablet, throttlerapp.VStreamerName)

assert.NoError(t, err)
assert.Contains(t, body, vstreamerThrottlerAppName)
assert.Contains(t, body, throttlerapp.VStreamerName)
}
}

Expand All @@ -449,7 +447,7 @@ func TestSchemaChange(t *testing.T) {
// clock irregularities
assert.GreaterOrEqual(t, lastThrottledTime.Add(time.Second), startedTime)
component := row.AsString("component_throttled", "")
assert.Contains(t, []string{string(vreplication.VStreamerComponentName), string(vreplication.RowStreamerComponentName)}, component)
assert.Contains(t, []string{throttlerapp.VStreamerName.String(), throttlerapp.RowStreamerName.String()}, component)
}()
// now unthrottled
status := onlineddl.WaitForMigrationStatus(t, &vtParams, shards, uuid, normalMigrationWait, schema.OnlineDDLStatusComplete, schema.OnlineDDLStatusFailed)
Expand Down Expand Up @@ -478,7 +476,7 @@ func TestSchemaChange(t *testing.T) {
// Use VTGate for throttling, issue a `ALTER VITESS_MIGRATION THROTTLE ALL ...`
onlineddl.ThrottleAllMigrations(t, &vtParams)
defer onlineddl.UnthrottleAllMigrations(t, &vtParams)
onlineddl.CheckThrottledApps(t, &vtParams, onlineDDLThrottlerAppName, true)
onlineddl.CheckThrottledApps(t, &vtParams, throttlerapp.OnlineDDLName, true)

// spawn n migrations; cancel them via cancel-all
var wg sync.WaitGroup
Expand All @@ -497,7 +495,7 @@ func TestSchemaChange(t *testing.T) {
// Use VTGate for throttling, issue a `ALTER VITESS_MIGRATION THROTTLE ALL ...`
onlineddl.ThrottleAllMigrations(t, &vtParams)
defer onlineddl.UnthrottleAllMigrations(t, &vtParams)
onlineddl.CheckThrottledApps(t, &vtParams, onlineDDLThrottlerAppName, true)
onlineddl.CheckThrottledApps(t, &vtParams, throttlerapp.OnlineDDLName, true)

// spawn n migrations; cancel them via cancel-all
var wg sync.WaitGroup
Expand Down Expand Up @@ -531,16 +529,16 @@ func TestSchemaChange(t *testing.T) {
case 0:
// this is the shard where we run PRS
// Use per-tablet throttling API
body, err = throttleApp(currentPrimaryTablet, onlineDDLThrottlerAppName)
defer unthrottleApp(currentPrimaryTablet, onlineDDLThrottlerAppName)
body, err = throttleApp(currentPrimaryTablet, throttlerapp.OnlineDDLName)
defer unthrottleApp(currentPrimaryTablet, throttlerapp.OnlineDDLName)
case 1:
// no PRS on this shard
// Use per-tablet throttling API
body, err = throttleApp(shards[i].Vttablets[0], onlineDDLThrottlerAppName)
defer unthrottleApp(shards[i].Vttablets[0], onlineDDLThrottlerAppName)
body, err = throttleApp(shards[i].Vttablets[0], throttlerapp.OnlineDDLName)
defer unthrottleApp(shards[i].Vttablets[0], throttlerapp.OnlineDDLName)
}
assert.NoError(t, err)
assert.Contains(t, body, onlineDDLThrottlerAppName)
assert.Contains(t, body, throttlerapp.OnlineDDLName)
}
uuid := testOnlineDDLStatement(t, alterTableTrivialStatement, "vitess", providedUUID, providedMigrationContext, "vtgate", "test_val", "", true)

Expand Down Expand Up @@ -596,14 +594,14 @@ func TestSchemaChange(t *testing.T) {
case 0:
// this is the shard where we run PRS
// Use per-tablet throttling API
body, err = unthrottleApp(currentPrimaryTablet, onlineDDLThrottlerAppName)
body, err = unthrottleApp(currentPrimaryTablet, throttlerapp.OnlineDDLName)
case 1:
// no PRS on this shard
// Use per-tablet throttling API
body, err = unthrottleApp(shards[i].Vttablets[0], onlineDDLThrottlerAppName)
body, err = unthrottleApp(shards[i].Vttablets[0], throttlerapp.OnlineDDLName)
}
assert.NoError(t, err)
assert.Contains(t, body, onlineDDLThrottlerAppName)
assert.Contains(t, body, throttlerapp.OnlineDDLName)
}
})
t.Run("expect completion", func(t *testing.T) {
Expand Down Expand Up @@ -823,11 +821,11 @@ func TestSchemaChange(t *testing.T) {
// - tablet throttling
t.Run("Revert a migration completed on one shard and cancelled on another", func(t *testing.T) {
// shard 0 will run normally, shard 1 will be throttled
defer unthrottleApp(shards[1].Vttablets[0], onlineDDLThrottlerAppName)
defer unthrottleApp(shards[1].Vttablets[0], throttlerapp.OnlineDDLName)
t.Run("throttle shard 1", func(t *testing.T) {
body, err := throttleApp(shards[1].Vttablets[0], onlineDDLThrottlerAppName)
body, err := throttleApp(shards[1].Vttablets[0], throttlerapp.OnlineDDLName)
assert.NoError(t, err)
assert.Contains(t, body, onlineDDLThrottlerAppName)
assert.Contains(t, body, throttlerapp.OnlineDDLName)
})

var uuid string
Expand All @@ -849,9 +847,9 @@ func TestSchemaChange(t *testing.T) {
onlineddl.CheckCancelAllMigrations(t, &vtParams, 1)
})
t.Run("unthrottle shard 1", func(t *testing.T) {
body, err := unthrottleApp(shards[1].Vttablets[0], onlineDDLThrottlerAppName)
body, err := unthrottleApp(shards[1].Vttablets[0], throttlerapp.OnlineDDLName)
assert.NoError(t, err)
assert.Contains(t, body, onlineDDLThrottlerAppName)
assert.Contains(t, body, throttlerapp.OnlineDDLName)
})
var revertUUID string
t.Run("issue revert migration", func(t *testing.T) {
Expand Down
7 changes: 4 additions & 3 deletions go/test/endtoend/onlineddl/vtgate_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/schema"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/throttlerapp"

"vitess.io/vitess/go/test/endtoend/cluster"

Expand Down Expand Up @@ -311,17 +312,17 @@ func UnthrottleAllMigrations(t *testing.T, vtParams *mysql.ConnParams) {
}

// CheckThrottledApps checks for existence or non-existence of an app in the throttled apps list
func CheckThrottledApps(t *testing.T, vtParams *mysql.ConnParams, appName string, expectFind bool) {
func CheckThrottledApps(t *testing.T, vtParams *mysql.ConnParams, throttlerApp throttlerapp.Name, expectFind bool) {
query := "show vitess_throttled_apps"
r := VtgateExecQuery(t, vtParams, query, "")

found := false
for _, row := range r.Named().Rows {
if row.AsString("app", "") == appName {
if throttlerApp.Equals(row.AsString("app", "")) {
found = true
}
}
assert.Equal(t, expectFind, found, "check app %v in throttled apps: %v", appName, found)
assert.Equal(t, expectFind, found, "check app %v in throttled apps: %v", throttlerApp, found)
}

// WaitForThrottledTimestamp waits for a migration to have a non-empty last_throttled_timestamp
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,12 +172,12 @@ func throttledApps(tablet *cluster.Vttablet) (resp *http.Response, respBody stri
}

func throttleCheck(tablet *cluster.Vttablet, skipRequestHeartbeats bool) (*http.Response, error) {
resp, err := httpClient.Get(fmt.Sprintf("http://localhost:%d/%s?s=%t", tablet.HTTPPort, checkAPIPath, skipRequestHeartbeats))
resp, err := httpClient.Get(fmt.Sprintf("http://localhost:%d/%s?app=test&s=%t", tablet.HTTPPort, checkAPIPath, skipRequestHeartbeats))
return resp, err
}

func throttleCheckSelf(tablet *cluster.Vttablet) (*http.Response, error) {
return httpClient.Get(fmt.Sprintf("http://localhost:%d/%s", tablet.HTTPPort, checkSelfAPIPath))
return httpClient.Get(fmt.Sprintf("http://localhost:%d/%s?app=test", tablet.HTTPPort, checkSelfAPIPath))
}

func warmUpHeartbeat(t *testing.T) (respStatus int) {
Expand Down
7 changes: 4 additions & 3 deletions go/test/endtoend/vreplication/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import (
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/schema"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/throttlerapp"
)

const (
Expand Down Expand Up @@ -125,12 +126,12 @@ func waitForQueryResult(t *testing.T, conn *mysql.Conn, database string, query s

// waitForTabletThrottlingStatus waits for the tablet to return the provided HTTP code for
// the provided app name in its self check.
func waitForTabletThrottlingStatus(t *testing.T, tablet *cluster.VttabletProcess, appName string, wantCode int64) {
func waitForTabletThrottlingStatus(t *testing.T, tablet *cluster.VttabletProcess, throttlerApp throttlerapp.Name, wantCode int64) {
var gotCode int64
timer := time.NewTimer(defaultTimeout)
defer timer.Stop()
for {
output, err := throttlerCheckSelf(tablet, appName)
output, err := throttlerCheckSelf(tablet, throttlerApp)
require.NoError(t, err)

gotCode, err = jsonparser.GetInt([]byte(output), "StatusCode")
Expand All @@ -144,7 +145,7 @@ func waitForTabletThrottlingStatus(t *testing.T, tablet *cluster.VttabletProcess
select {
case <-timer.C:
require.FailNow(t, fmt.Sprintf("tablet %q did not return expected status of %d for application %q before the timeout of %s; last seen status: %d",
tablet.Name, wantCode, appName, defaultTimeout, gotCode))
tablet.Name, wantCode, throttlerApp, defaultTimeout, gotCode))
default:
time.Sleep(defaultTick)
}
Expand Down
17 changes: 9 additions & 8 deletions go/test/endtoend/vreplication/vreplication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import (
"vitess.io/vitess/go/vt/log"
querypb "vitess.io/vitess/go/vt/proto/query"
throttlebase "vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/base"
"vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/throttlerapp"
"vitess.io/vitess/go/vt/vttablet/tabletserver/vstreamer"
)

Expand All @@ -59,8 +60,8 @@ var (
sourceKsOpts = make(map[string]string)
targetKsOpts = make(map[string]string)
httpClient = throttlebase.SetupHTTPClient(time.Second)
sourceThrottlerAppName = "vstreamer"
targetThrottlerAppName = "vreplication"
sourceThrottlerAppName = throttlerapp.VStreamerName
targetThrottlerAppName = throttlerapp.VReplicationName
)

const (
Expand Down Expand Up @@ -95,16 +96,16 @@ func throttleResponse(tablet *cluster.VttabletProcess, path string) (respBody st
return respBody, err
}

func throttleApp(tablet *cluster.VttabletProcess, app string) (string, error) {
return throttleResponse(tablet, fmt.Sprintf("throttler/throttle-app?app=%s&duration=1h", app))
func throttleApp(tablet *cluster.VttabletProcess, throttlerApp throttlerapp.Name) (string, error) {
return throttleResponse(tablet, fmt.Sprintf("throttler/throttle-app?app=%s&duration=1h", throttlerApp.String()))
}

func unthrottleApp(tablet *cluster.VttabletProcess, app string) (string, error) {
return throttleResponse(tablet, fmt.Sprintf("throttler/unthrottle-app?app=%s", app))
func unthrottleApp(tablet *cluster.VttabletProcess, throttlerApp throttlerapp.Name) (string, error) {
return throttleResponse(tablet, fmt.Sprintf("throttler/unthrottle-app?app=%s", throttlerApp.String()))
}

func throttlerCheckSelf(tablet *cluster.VttabletProcess, app string) (respBody string, err error) {
apiURL := fmt.Sprintf("http://%s:%d/throttler/check-self?app=%s", tablet.TabletHostname, tablet.Port, app)
func throttlerCheckSelf(tablet *cluster.VttabletProcess, throttlerApp throttlerapp.Name) (respBody string, err error) {
apiURL := fmt.Sprintf("http://%s:%d/throttler/check-self?app=%s", tablet.TabletHostname, tablet.Port, throttlerApp.String())
resp, err := httpClient.Get(apiURL)
if err != nil {
return "", err
Expand Down
17 changes: 9 additions & 8 deletions go/vt/vttablet/onlineddl/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ import (
"vitess.io/vitess/go/vt/vttablet/tabletserver/connpool"
"vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv"
"vitess.io/vitess/go/vt/vttablet/tabletserver/throttle"
"vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/throttlerapp"
"vitess.io/vitess/go/vt/vttablet/tmclient"
)

Expand Down Expand Up @@ -124,7 +125,6 @@ var (
migrationFailureFileName = "migration-failure.log"
onlineDDLUser = "vt-online-ddl-internal"
onlineDDLGrant = fmt.Sprintf("'%s'@'%s'", onlineDDLUser, "%")
throttlerOnlineDDLApp = "online-ddl"
throttleCheckFlags = &throttle.CheckFlags{}
)

Expand Down Expand Up @@ -1626,7 +1626,7 @@ exit $exit_code
fmt.Sprintf("--serve-socket-file=%s", serveSocketFile),
fmt.Sprintf("--hooks-path=%s", tempDir),
fmt.Sprintf(`--hooks-hint-token=%s`, onlineDDL.UUID),
fmt.Sprintf(`--throttle-http=http://localhost:%d/throttler/check?app=%s:gh-ost:%s&p=low`, servenv.Port(), throttlerOnlineDDLApp, onlineDDL.UUID),
fmt.Sprintf(`--throttle-http=http://localhost:%d/throttler/check?app=%s:%s:%s&p=low`, servenv.Port(), throttlerapp.OnlineDDLName, throttlerapp.GhostName, onlineDDL.UUID),
fmt.Sprintf(`--database=%s`, e.dbName),
fmt.Sprintf(`--table=%s`, onlineDDL.Table),
fmt.Sprintf(`--alter=%s`, alterOptions),
Expand Down Expand Up @@ -1773,7 +1773,7 @@ export MYSQL_PWD
my ($self, %args) = @_;
return sub {
if (head("http://localhost:{{VTTABLET_PORT}}/throttler/check?app={{THROTTLER_ONLINE_DDL_APP}}:pt-osc:{{MIGRATION_UUID}}&p=low")) {
if (head("http://localhost:{{VTTABLET_PORT}}/throttler/check?app={{THROTTLER_ONLINE_DDL_APP}}:{{THROTTLER_PT_OSC_APP}}:{{MIGRATION_UUID}}&p=low")) {
# Got HTTP 200 OK, means throttler is happy
return 0;
} else {
Expand All @@ -1787,7 +1787,8 @@ export MYSQL_PWD
`
pluginCode = strings.ReplaceAll(pluginCode, "{{VTTABLET_PORT}}", fmt.Sprintf("%d", servenv.Port()))
pluginCode = strings.ReplaceAll(pluginCode, "{{MIGRATION_UUID}}", onlineDDL.UUID)
pluginCode = strings.ReplaceAll(pluginCode, "{{THROTTLER_ONLINE_DDL_APP}}", throttlerOnlineDDLApp)
pluginCode = strings.ReplaceAll(pluginCode, "{{THROTTLER_ONLINE_DDL_APP}}", throttlerapp.OnlineDDLName.String())
pluginCode = strings.ReplaceAll(pluginCode, "{{THROTTLER_PT_OSC_APP}}", throttlerapp.PTOSCName.String())

pluginCode = strings.ReplaceAll(pluginCode, "{{OnlineDDLStatusRunning}}", string(schema.OnlineDDLStatusRunning))
pluginCode = strings.ReplaceAll(pluginCode, "{{OnlineDDLStatusComplete}}", string(schema.OnlineDDLStatusComplete))
Expand Down Expand Up @@ -2146,7 +2147,7 @@ func (e *Executor) ThrottleAllMigrations(ctx context.Context, expireString strin
if err := e.lagThrottler.CheckIsReady(); err != nil {
return nil, err
}
_ = e.lagThrottler.ThrottleApp(throttlerOnlineDDLApp, time.Now().Add(duration), ratio)
_ = e.lagThrottler.ThrottleApp(throttlerapp.OnlineDDLName.String(), time.Now().Add(duration), ratio)
return emptyResult, nil
}

Expand All @@ -2166,7 +2167,7 @@ func (e *Executor) UnthrottleAllMigrations(ctx context.Context) (result *sqltype
return nil, err
}
defer e.triggerNextCheckInterval()
_ = e.lagThrottler.UnthrottleApp(throttlerOnlineDDLApp)
_ = e.lagThrottler.UnthrottleApp(throttlerapp.OnlineDDLName.String())
return emptyResult, nil
}

Expand Down Expand Up @@ -3474,7 +3475,7 @@ func (e *Executor) reviewRunningMigrations(ctx context.Context) (countRunnning i
if err := e.lagThrottler.CheckIsReady(); err == nil {
// No point in reviewing throttler info if it's not enabled&open
for _, app := range e.lagThrottler.ThrottledApps() {
if app.AppName == throttlerOnlineDDLApp {
if throttlerapp.OnlineDDLName.Equals(app.AppName) {
currentUserThrottleRatio = app.Ratio
break
}
Expand Down Expand Up @@ -3608,7 +3609,7 @@ func (e *Executor) reviewRunningMigrations(ctx context.Context) (countRunnning i
// - it's a deadlock.
// And so, once per reviewRunningMigrations(), and assuming there _are_ running migrations, we ensure to hit a throttler check. This will kick
// on-demand heartbeats, unlocking the deadlock.
e.lagThrottler.CheckByType(ctx, throttlerOnlineDDLApp, "", throttleCheckFlags, throttle.ThrottleCheckPrimaryWrite)
e.lagThrottler.CheckByType(ctx, throttlerapp.OnlineDDLName.String(), "", throttleCheckFlags, throttle.ThrottleCheckPrimaryWrite)
})
}
}
Expand Down
3 changes: 2 additions & 1 deletion go/vt/vttablet/tabletmanager/vdiff/framework_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"vitess.io/vitess/go/vt/vttablet/tabletconntest"
"vitess.io/vitess/go/vt/vttablet/tabletmanager/vreplication"
"vitess.io/vitess/go/vt/vttablet/tabletserver/schema"
"vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/throttlerapp"
"vitess.io/vitess/go/vt/vttablet/tabletserver/vstreamer"
"vitess.io/vitess/go/vt/vttablet/tabletserver/vstreamer/testenv"
"vitess.io/vitess/go/vt/vttablet/tmclient"
Expand Down Expand Up @@ -235,7 +236,7 @@ func (ftc *fakeTabletConn) VStream(ctx context.Context, request *binlogdatapb.VS
if vstreamHook != nil {
vstreamHook(ctx)
}
return vdiffenv.vse.Stream(ctx, request.Position, request.TableLastPKs, request.Filter, send)
return vdiffenv.vse.Stream(ctx, request.Position, request.TableLastPKs, request.Filter, throttlerapp.VStreamerName, send)
}

// vstreamRowsHook allows you to do work just before calling VStreamRows.
Expand Down
Loading

0 comments on commit 374b94c

Please sign in to comment.